{
 "cells": [
  {
   "cell_type": "markdown",
   "id": "6d331100",
   "metadata": {},
   "source": [
    "# Construct data for Indeed without closed jobs\n",
    "In this notebook, we construct the data for Indeed when the last files do not contain open job postings.\n",
    "\n",
    "We use the inflows data, getting the last_date from the duration data and making some adjustments for missing open job postings.\n",
    "\n",
    "note: some inflow files have 'NA' for all company_name; fill them with the duration files."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "b92ac7b1",
   "metadata": {
    "ExecuteTime": {
     "end_time": "2022-04-14T13:21:04.650485Z",
     "start_time": "2022-04-14T13:21:02.898156Z"
    },
    "execution": {
     "iopub.execute_input": "2024-04-14T16:55:20.696844Z",
     "iopub.status.busy": "2024-04-14T16:55:20.696551Z",
     "iopub.status.idle": "2024-04-14T16:55:22.239867Z",
     "shell.execute_reply": "2024-04-14T16:55:22.239189Z"
    }
   },
   "outputs": [],
   "source": [
    "import sys\n",
    "from datetime import datetime\n",
    "import polars as pl\n",
    "import json\n",
    "\n",
    "sys.path.append(\"../\")\n",
    "import tools\n",
    "\n",
    "# CHANGE\n",
    "PATH = '' # raw data\n",
    "path_data = '' # analysis data\n",
    "path_output = '' # curated data\n",
    "\n",
    "pl.Config.set_fmt_str_lengths(100);\n",
    "pl.Config.set_tbl_cols(50);"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "fd53b9eb",
   "metadata": {
    "execution": {
     "iopub.execute_input": "2024-04-14T16:55:22.243429Z",
     "iopub.status.busy": "2024-04-14T16:55:22.243095Z",
     "iopub.status.idle": "2024-04-14T16:56:19.510462Z",
     "shell.execute_reply": "2024-04-14T16:56:19.508293Z"
    }
   },
   "outputs": [],
   "source": [
    "# INFLOWS\n",
    "\n",
    "def read_csv_ignore_errors(file, cols=None, dtypes=None):\n",
    "    try:\n",
    "        df = pl.read_csv(\n",
    "            PATH + file, try_parse_dates=True, columns=cols, dtypes=dtypes\n",
    "            )\n",
    "    except:\n",
    "        print(file)\n",
    "        df = pl.read_csv(\n",
    "            PATH + file, try_parse_dates=True, columns=cols, ignore_errors=True,\n",
    "            dtypes=dtypes\n",
    "            )\n",
    "    return df\n",
    "\n",
    "# update filenames\n",
    "inflows_filenames = tools.FileNames(\n",
    "    path_df=path_data+'df_inflows_filenames.csv',\n",
    "    new_filenames_format=r'CA_job_postings_202\\d{5}.csv',\n",
    ")\n",
    "inflows_filenames.update_filenames()\n",
    "\n",
    "\n",
    "# construct inflows data\n",
    "\n",
    "NULL_MAPPER = {'': None, 'NA': None}\n",
    "\n",
    "dfs = [\n",
    "    read_csv_ignore_errors(file, dtypes={'company_name': pl.Utf8})\n",
    "    for file in inflows_filenames.to_list()\n",
    "    ]\n",
    "\n",
    "df_inflows = (\n",
    "    pl.concat(dfs, how='diagonal')\n",
    "    .rename({'jlcity': 'city', 'jladmin1code': 'province'})\n",
    "    .with_columns(pl.col(pl.Utf8).map_dict(NULL_MAPPER, default=pl.first()))\n",
    "    .groupby('job_key')\n",
    "    .agg(\n",
    "            [\n",
    "                pl.max('company_name'),  # picks non-null company name (if there is one)\n",
    "                pl.max('job_title'),  # picks non-null job_title\n",
    "                pl.max('city'),  # picks non-null city\n",
    "                pl.min('date_first_visible'),\n",
    "                pl.first('province')  # take values from most recent file\n",
    "            ]\n",
    "    )\n",
    ")\n",
    "\n",
    "print(df_inflows.describe())"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "1e566c5f",
   "metadata": {},
   "outputs": [],
   "source": [
    "print(df_inflows['date_first_visible'].max())"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "f62de4c5",
   "metadata": {
    "execution": {
     "iopub.execute_input": "2024-04-14T16:56:19.515346Z",
     "iopub.status.busy": "2024-04-14T16:56:19.514935Z",
     "iopub.status.idle": "2024-04-14T16:59:09.904623Z",
     "shell.execute_reply": "2024-04-14T16:59:09.903317Z"
    }
   },
   "outputs": [],
   "source": [
    "# DURATION FILES\n",
    "\n",
    "# all duration files after may_2022 batch update except CA_duration_20230310.csv because no company_name and all jobs present in next files\n",
    " \n",
    "duration_filenames = tools.FileNames(\n",
    "    path_df=path_data+'df_duration_filenames.csv',\n",
    "    new_filenames_format=r'CA_duration_202\\d{5}.csv',\n",
    ")\n",
    "duration_filenames.update_filenames()\n",
    "\n",
    "\n",
    "# assemble duration files without open job postings from previous files\n",
    "\n",
    "CUTOFF_OPEN_JOB_POSTINGS = 100_000\n",
    "\n",
    "def check_open_job_postings(df):\n",
    "    \"\"\"returns True if file contains open job postings, False otherwise\"\"\"\n",
    "    num_job_postings_last_day = len(\n",
    "        df\n",
    "        .groupby('job_key')\n",
    "        .agg([pl.max('last_date')])\n",
    "        .filter(pl.col('last_date') == pl.max('last_date'))\n",
    "    )\n",
    "    return num_job_postings_last_day > CUTOFF_OPEN_JOB_POSTINGS\n",
    "\n",
    "\n",
    "dfs = []\n",
    "for file in duration_filenames.to_list():\n",
    "    df = (\n",
    "        read_csv_ignore_errors(file, cols=['job_key', 'last_date'])\n",
    "        .groupby('job_key')  \n",
    "        .agg([pl.max('last_date')]) ## take most recent last_date for multiple spells\n",
    "    )\n",
    "    if check_open_job_postings(df):  # need to remove opened jobs on last date (closed jobs on last date will appear in subsequent files)\n",
    "        df = df.filter(pl.col('last_date') < pl.max('last_date'))\n",
    "    dfs.append(df)\n",
    "\n",
    "df_last_dates = (\n",
    "    pl.concat(dfs)\n",
    "    .groupby('job_key')\n",
    "    .agg(pl.max('last_date').alias('last_date'))\n",
    ")\n",
    "\n",
    "# assemble duration files to get missing company_name, city, province (from inflows):\n",
    "# Error: duration days int and str. Re-define the function read_csv_ignore_errors. \n",
    "# Check if 'duration_days' column exist and convert it to integer\n",
    "def read_csv_ignore_errors_dur(file, cols=None, dtypes=None):\n",
    "    try:\n",
    "        df = pl.read_csv(\n",
    "            PATH + file, try_parse_dates=True, columns=cols, dtypes=dtypes\n",
    "            )\n",
    "    except:\n",
    "        print(file)\n",
    "        df = pl.read_csv(\n",
    "            PATH + file, try_parse_dates=True, columns=cols, ignore_errors=True,\n",
    "            dtypes=dtypes\n",
    "            )\n",
    "    if 'duration_days' in df.columns:\n",
    "        df = df.with_columns([\n",
    "            # replace 'NA' and other non-numeric strings with None\n",
    "            df['duration_days'].apply(lambda x: None if x in ['NA', 'na', ''] else x)\n",
    "            .cast(pl.Int64)\n",
    "            #.fill_none(None)\n",
    "            .alias('duration_days')\n",
    "        ])\n",
    "    return df\n",
    "dfs = [read_csv_ignore_errors_dur(file) for file in duration_filenames.to_list()]\n",
    "\n",
    "df_missing_features = (\n",
    "    pl.concat(dfs, how='diagonal')\n",
    "    .rename({'jlcity': 'city'})\n",
    "    .with_columns(pl.col(pl.Utf8).map_dict(NULL_MAPPER, default=pl.first()))\n",
    "    .groupby('job_key')\n",
    "    .agg(\n",
    "        [\n",
    "            pl.max('city').alias('city'),  # avoids null values\n",
    "            pl.max('company_name').alias('company_name'),\n",
    "            pl.max('job_title').alias('job_title'),\n",
    "            pl.max('searchable_days').alias('searchable_days'),\n",
    "            pl.max('duration_days').alias('duration_days')\n",
    "        ]\n",
    "    )\n",
    ")"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "21f5c1e3",
   "metadata": {
    "execution": {
     "iopub.execute_input": "2024-04-14T16:59:16.114719Z",
     "iopub.status.busy": "2024-04-14T16:59:16.114438Z",
     "iopub.status.idle": "2024-04-14T16:59:22.031349Z",
     "shell.execute_reply": "2024-04-14T16:59:22.030153Z"
    }
   },
   "outputs": [],
   "source": [
    "# merge inflows with duration to get last dates and fill missing values\n",
    "\n",
    "df = (\n",
    "    df_inflows\n",
    "    .join(df_last_dates, on='job_key', how='left')\n",
    "    .with_columns(pl.col('last_date').fill_null(pl.max('last_date')))\n",
    "    # fill missing values with values from durations files\n",
    "    .join(df_missing_features, on='job_key', how='left', suffix='_from_duration')\n",
    "    .with_columns(\n",
    "        [\n",
    "            pl.col('company_name').fill_null(pl.col('company_name_from_duration')),\n",
    "            pl.col('city').fill_null(pl.col('city_from_duration')),\n",
    "            pl.col('job_title').fill_null(pl.col('job_title_from_duration')),\n",
    "            pl.col('searchable_days').fill_null(pl.col('duration_days')),\n",
    "        ]\n",
    "    )\n",
    "    .with_columns(\n",
    "        pl.col('searchable_days')\n",
    "        .fill_null((pl.col('last_date') - pl.col('date_first_visible')).dt.days() + 1)\n",
    "    )\n",
    "    .drop(['company_name_from_duration', 'city_from_duration', 'job_title_from_duration'])\n",
    ")"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "4eb49948",
   "metadata": {
    "execution": {
     "iopub.execute_input": "2024-04-14T16:59:22.034640Z",
     "iopub.status.busy": "2024-04-14T16:59:22.034392Z",
     "iopub.status.idle": "2024-04-14T17:00:54.250135Z",
     "shell.execute_reply": "2024-04-14T17:00:54.249299Z"
    }
   },
   "outputs": [],
   "source": [
    "# clean data\n",
    "\n",
    "df = tools.clean_city_name(df)\n",
    "df = tools.clean_company_name(df)\n",
    "\n",
    "canadian_provinces = [\n",
    "    'ab', 'bc', 'mb', 'nb', 'nl', 'ns', 'nt', 'nu', 'on', 'pe', 'qc', 'sk',\n",
    "    'yt', 'unknown']\n",
    "\n",
    "df = (\n",
    "    df\n",
    "    .with_columns(\n",
    "            pl.col('province').map_dict({'n': 'qc'}, default=pl.first()) ## CISS Abiti-Temastingue\n",
    "        )\n",
    "    .filter(pl.col('province').is_in(canadian_provinces))  # very few obs from US lost\n",
    "    .with_columns(\n",
    "        pl.col(pl.Utf8).map_dict(NULL_MAPPER, default=pl.first())\n",
    "    )\n",
    "    .filter(pl.col('last_date') >= pl.col('date_first_visible'))  # negative duration in very recent data will probably be fixed in next update\n",
    ")"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "50b33971",
   "metadata": {
    "execution": {
     "iopub.execute_input": "2024-04-14T17:00:54.253469Z",
     "iopub.status.busy": "2024-04-14T17:00:54.253219Z",
     "iopub.status.idle": "2024-04-14T17:01:16.622999Z",
     "shell.execute_reply": "2024-04-14T17:01:16.621232Z"
    }
   },
   "outputs": [],
   "source": [
    "# write data\n",
    "\n",
    "df.write_parquet(path_output+'indeed_all_jobs.parquet')"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "9feecd98",
   "metadata": {},
   "outputs": [],
   "source": [
    "df.sort(\"date_first_visible\", descending=True).head()"
   ]
  }
 ],
 "metadata": {
  "kernelspec": {
   "display_name": "Python 3.10.12 ('env_indeed2')",
   "language": "python",
   "name": "python3"
  },
  "language_info": {
   "codemirror_mode": {
    "name": "ipython",
    "version": 3
   },
   "file_extension": ".py",
   "mimetype": "text/x-python",
   "name": "python",
   "nbconvert_exporter": "python",
   "pygments_lexer": "ipython3",
   "version": "3.10.12"
  },
  "vscode": {
   "interpreter": {
    "hash": "7120590dfa35e6512fb14e5e70b67446c3e78c7a5c027e908dbb14d6a3f8a0eb"
   }
  }
 },
 "nbformat": 4,
 "nbformat_minor": 5
}
